PPO is state of the art algorithm for DeepRL. We ll see how this can be implemented in code(https://github.com/Arpan12/Reinforcement-Learning-Algos/blob/main/PPO_Discrete.ipynb)
We wont go in depth into theory but focus on implementation.
Promise
After reading till the end of the blog, you should be able to implement PPO for discrete action space based environments.
Theory
PPO is a policy gradient method
\(\pi(u|x)\to\)our current policy. This in deep RL is usually approximate by a Neural Net called actor net
\(g(x_n,u_n)\to\) cost incurred, we get from the environment for taking action
\(V_\pi(x) = g(x,\pi(x)) + \alpha*V_\pi(x+1)\) = value function for policy \(\pi\) . This in deep RL is usually approximate by a Neural Net called Critic net
if \(J(\theta) = V_\pi(x_0)\) approximate value function for an episode starting at x0 following policy \(\pi(\theta)\) ,We can decrease \(J(\theta)\) by gradient descent \(\theta_{new} = \theta - \gamma*\Delta J(\theta)\)
so writing more mathematically
\(J(\theta) = E_{u_n}\sim \pi_{\theta}[\sum_{n=0}^N\alpha^n*g(x_n,u_n)]\)
we are able to show(proof not important)
\(\Delta_\theta J(\theta) = E[\sum_{n=0}^N\Psi_n\Delta_nlog\ \pi(u_n|x_n,\theta)]\)
which is similar to minimizing:
\(min_\theta E[\Psi_n*log\ \pi(u_n|x_n,|\theta)]\)
This is what we would use for actor loss and minimize. In Practice since we get rewards from the Env so we put a negative sign before \(\Psi_n\) so that we maximize the reward gain
For critic loss we take \(\Delta t_d^2\) where \(t_d = g(x_n,u_n)+\gamma*V(x_{n+1)}-V(x_n)\)
for
ReInforce: \(\Psi_n = G_n = \sum_{k=n}^N \alpha^k * g(x_k,u_k)\)
ReInforce with baseLine: \(\Psi_n = G_n-V(x_n)\)
Actor-Critic: \(\Psi_n = g(x_n,u_n)+\alpha*V(x_{n+1})-V(x_n)\)
PPO: \(\Psi_n = A_t = \sum_{t=k}^N (\lambda *\mu)^{t-k}*( Q(x_n,u_n)-V(x_n))\)
where \(Q(x_n,u_n) = g(x_n,u_n)+\gamma*V({x_{n+1}})\)
but for PPO we approximate the log gradient even further by \(min_\theta E[\sum_{n=0}^N A_n\frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old})}]\)
For PPO we generally clip the gradient by limiting between 1-\(\epsilon\) and 1+\(\epsilon\)
\(min_\theta E[\sum_{n=0}^N min(A_n\ \frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old}},clip(\frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old}},1-\epsilon,1+\epsilon)A_n)]\)
Implementation
Make Actor and Critic NN
actor NN: takes in a state(you need to make a tensor out of your state.Cant pass a list or np array) and spits out a Categorical object containing probs(because the last layer was softmax on action_dim)
for eg: in our case our observation is (4,) python list
>>> print(f"observation= {observation}")
observation= [ 0.02916372 0.02608052 0.01869606 -0.00384168]
we need to convert them to Tensor before passing them to NN
>>> state = T.tensor([observation],dtype=T.float).to(self.actor.device)
>>> print(f"state= {state}")
state= tensor([[ 0.0292, 0.0261, 0.0187, -0.0038]], device='cuda:0')
we get a tensor with Probability dist from NN(last layer was softmax). We then get an action by sampling the dist object
>>> dist = self.actor(state)
>>> print(f"dist {dist} {dist.probs}")
dist Categorical(probs: torch.Size([1, 2])) tensor([[0.4913, 0.5087]], device='cuda:0', grad_fn=<DivBackward0>)
>>> action = dist.sample()
>>> print(f"action = {action}")
action = tensor([1], device='cuda:0')
# Retrieve action from action tensor and log_probs
# squeeze removes all axises having value 1. item() returns a standard python float
>>> probs = T.squeeze(dist.log_prob(action)).item()
>>> action = T.squeeze(action).item()
>>> print(f"log prob {dist.log_prob(action)} squeezed = {T.squeeze(dist.log_prob(action))} Probs = {probs}")
log prob tensor([-0.7533], device='cuda:0', grad_fn=<SqueezeBackward1>) squeezed = -0.753328263759613 Probs = -0.753328263759613
The code For Actor class. It derives from Torch.nn.Module
class AgentNetwork(nn.Module):
def __init__(self,input_dims,action_dim,lr,layer1=256,layer2=256,weight_file='weightFiles/ppo_discrete'):
super(AgentNetwork,self).__init__()
self.checkpoint_file = os.path.join(weight_file,'ppo_actor_weight')
#TOCHECK: *input_dims vs input_dims
self.actor = nn.Sequential(
nn.Linear(*input_dims,layer1),
nn.ReLU(),
nn.Linear(layer1,layer2),
nn.ReLU(),
nn.Linear(layer2,action_dim),
nn.Softmax(dim=-1)
)
self.optimizer = optim.Adam(self.parameters(),lr=lr)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self,state):
dist = self.actor(state)
#TOCHECK: what does categorical do
dist = Categorical(dist)
return dist
def save_checkpoint(self):
T.save(self.state_dict(),self.checkpoint_file)
def load_checkpoint(self):
self.load_state_dict(T.load(self.checkpoint_file))
Critic NN: takes in a state(tensor) and returns a tensor containing a single float corresponding to Value of that state
>>> state = T.tensor([observation],dtype=T.float).to(self.actor.device)
>>> value = self.critic(state)
>>> print(f"value {value}")
value tensor([[-0.0977]], device='cuda:0', grad_fn=<AddmmBackward0>)
Code for Critic Class
class CriticNetwork(nn.Module):
def __init__(self,input_dims,lr,layer1=256,layer2=256,weight_file='weightFiles/ppo_discrete'):
super(CriticNetwork,self).__init__()
self.checkpoint_file = os.path.join(weight_file,'ppo_critic_weight')
self.critic = nn.Sequential(
nn.Linear(*input_dims,layer1),
nn.ReLU(),
nn.Linear(layer1,layer2),
nn.ReLU(),
nn.Linear(layer2,1)
)
self.optimizer = optim.Adam(self.parameters(),lr=lr)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self,state):
value = self.critic(state)
return value
def save_checkpoint(self):
T.save(self.state_dict(),self.checkpoint_file)
def load_checkpoint(self):
self.load_state_dict(T.load(self.checkpoint_file))
Batches and How to generate them
We need to take all the elements in the memory and create batches out of them.
GOAL
1.The elements should not repeat from one batch to another
2.There should be any correlation between elements of same batch.
Here is how we achieve that in code
def generate_batches(self):
n_states = len(self.states)
batches=[]
i=0
indices = np.arange(n_states,dtype = np.int64)
np.random.shuffle(indices)
for i in range(n_states):
batches.append(indices[i:i+self.batch_size])
i+=self.batch_size
return np.array(self.states),\
np.array(self.actions),\
np.array(self.probs), \
np.array(self.vals),\
np.array(self.rewards), \
np.array(self.dones),\
batches
#Note here only batch index get suffled. We need to have the actual order of states,actions,probs,vals,rewards and done so that we can calculate advatage of each state later
compute the advantage
We know that advantage of a state is given by following equation
\(A_t = \sum_{t=k}^N (\lambda *\mu)^{t-k}*( Q(x_n,u_n)-V(x_n))\)
Here is how we achieve that in code
state_arr,action_arr,probs_arr,vals_arr, \
rewards_arr,dones_arr,batches = self.memory.generate_batches()
advantages=np.zeros_like(rewards_arr)
for t in reversed(range(len(state_arr)-1)):
advantages[t] = rewards_arr[t]+self.gamma*vals_arr[t+1]*(1-int(dones_arr[t]))-vals_arr[t] + self.gamma*self.lambda_factor*advantages[t+1]
advantages = T.tensor(advantages).to(self.actor.device)
loss function
for actor loss,we know that we need to minimize
\(min_\theta E[\sum_{n=0}^N min(A_n\ \frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old}},clip(\frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old}},1-\epsilon,1+\epsilon)A_n)]\)
so we can find gradient of this function with respect to our weights \(\Theta\) and do a 1 step gradient descent
This can be done in Code by
states = T.tensor(state_arr[batch],dtype = T.float).to(self.actor.device)
actions = T.tensor(action_arr[batch],dtype = T.float).to(self.actor.device)
old_probs = T.tensor(probs_arr[batch],dtype = T.float).to(self.actor.device)
dist = self.actor(states)
new_probs = dist.log_prob(actions)
#TOCHECK: what do exp() do
prob_ratio = new_probs.exp()/old_probs.exp()
weighted_prob = advantages[batch]*(prob_ratio)
weighted_clipped_probs = T.clamp(prob_ratio,1-self.policy_clip,1+self.policy_clip)*advantages[batch]
actor_loss = - T.min(weighted_clipped_probs,weighted_prob).mean()
for Critic loss,we know that we need to minimize the error of values of our state \((V_{des} - V_{pred})^2\)
where
\(V_{des} = a_t+value_{theta\_old}\)
\(V_{pred} = value \ from \ critic \ NN\)
This is done in code as
critic_values = T.squeeze(critic_values)
desired_state_values = advantages[batch]+values[batch]
critic_loss = (desired_state_values-critic_values)**2
critic_loss = critic_loss.mean()
Then We calculate the total loss and do 1 step gradient descent as
total_loss = actor_loss+critic_loss*0.5
self.actor.optimizer.zero_grad()
self.critic.optimizer.zero_grad()
total_loss.backward()
self.actor.optimizer.step()
self.critic.optimizer.step()
Learning Pseudo Code
PPO_agent_learn:
for _ in num_of_epoch:
compute advantage of the states in memory
generate batchs
Iterate over batches:
calculate cummulative actor loss for the batch
calculate cummulative critic loss for the batch
do gradient update
clear Memory